package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformPromotionUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionDO;
import com.ruyuan.eshop.promotion.mq.event.SalesPromotionCreatedEvent;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.mq.splitter.ListSplitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * 促销活动创建事件监听器
 */
@Slf4j
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {

    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;
    @Resource
    private DefaultProducer defaultProducer;

    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for(MessageExt messageExt : list) {
                // 这个代码就可以拿到一个刚刚创建成功的促销活动
                String message = new String(messageExt.getBody());
                SalesPromotionCreatedEvent salesPromotionCreatedEvent =
                        JSON.parseObject(message, SalesPromotionCreatedEvent.class);
                SalesPromotionDO salesPromotion = salesPromotionCreatedEvent.getSalesPromotion();

                // 为了这个促销活动，针对全体用户发起push

                // bucket，就是一个用户分片，对应的是一个startUserId->endUserId，用户id范围
                final int userBucketSize = 1000;

                // 拿到全体用户数量，两种做法，第一种是去找会员服务进行count，第二种是获取max(userid)，自增主键
                // select * from account order by id desc limit 1，类似于这样的sql语句，拿到你的表里主键值最大的一个
                JsonResult<Long> queryMaxUserIdResult = accountApi.queryMaxUserId();
                if (!queryMaxUserIdResult.getSuccess()) {
                    throw new BaseBizException(queryMaxUserIdResult.getErrorCode(), queryMaxUserIdResult.getErrorMessage());
                }
                Long maxUserId = queryMaxUserIdResult.getData();

                // 就可以根据一定的算法，把千万级用户推送任务分片，比如一个推送任务包含1000个用户/2000个用户
                Map<Long, Long> userBuckets = new LinkedHashMap<>(); // 上万条key-value对，每个key-value对就是一个startUserId->endUserId，推送任务分片
                AtomicBoolean doSharding = new AtomicBoolean(true);
                long startUserId = 1L; // 数据库自增主键是从1开始的
                // 用于测试的最大id，为了减少总发送消息条数用于测试
                // maxUserId = 100000L;
                while (doSharding.get()) {
                    if (startUserId > maxUserId) {
                        doSharding.compareAndSet(true, false);
                        break;
                    }
                    userBuckets.put(startUserId, startUserId + userBucketSize);
                    startUserId += userBucketSize;
                }


                PlatformPromotionUserBucketMessage promotionPushTask = PlatformPromotionUserBucketMessage.builder()
                        .promotionId(salesPromotion.getId())
                        .promotionType(salesPromotion.getType())
                        .mainMessage(salesPromotion.getName())
                        .message("您已获得活动资格，打开APP进入活动页面")
                        .informType(salesPromotion.getInformType())
                        .build();
                // 把可能成千上万的推送任务进行rocketmq消息的batch合并，以batch模式一批一批的发送任务到mq里去，跟rocketmq网络通信的耗时就少了
                List<String> promotionPushTasks = new ArrayList<>();
                for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
                    promotionPushTask.setStartUserId(userBucket.getKey());
                    promotionPushTask.setEndUserId(userBucket.getValue());
                    String promotionPushTaskJSON = JsonUtil.object2Json(promotionPushTask);
                    promotionPushTasks.add(promotionPushTaskJSON);
                }
                log.info("本次推送消息用户桶数量, {}",promotionPushTasks.size());
                // 把message分割
                ListSplitter splitter = new ListSplitter(promotionPushTasks, MESSAGE_BATCH_SIZE);
                while(splitter.hasNext()){
                    List<String> sendBatch = splitter.next();
                    log.info("本次批次消息数量,{}",sendBatch.size());
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, sendBatch, "平台优惠活动用户桶消息");
                    });
                }
            }
        } catch(Exception e) {
            log.error("consume error, 促销活动创建事件处理异常", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
